package ConnectedStreams;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 2 * @Author: 王杰
 * 3 * @Date: 2020/11/11 17:04
 * 4
 */
public class ConnectedStreamsTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /**
         * 在这个例子中，一个控制流是用来指定哪些词需要从 streamOfWords 里过滤掉的。
         * 一个称为 ControlFunction 的 RichCoFlatMapFunction 作用于连接的流来实现这个功能。
         */
        DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
        DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);

        control
                .connect(streamOfWords)
                .flatMap(new ControlFunction())
                .print();

        env.execute();
    }
}
